滑动窗口限流器
它是什么?
如果说缓存和双锁机制(DCL)是为了保护后端数据,那么滑动窗口限流器就是为了保护整个系统入口,防止被恶意刷接口(防刷)或流量激增冲垮。那么究竟什么是滑动窗口限流器?它的实现原理又是是怎样的?它和普通的固定窗口又有什么本质的区别呢?我们在这里使用一个通俗的例子帮你认识它。
想象一个火车站闸机,规定 “每分钟只能进入 100 人”。
- 固定窗口(老办法):闸机在 10:00:00 准时重置计数器。如果 10:00:59 秒进来了 100 人,10:01:01 又进来了 100 人。虽然每一分钟都没超标,但在那 2 秒钟内瞬时涌入了 200 人,闸机可能会被挤爆。这就是“临界突刺”问题。
- 滑动窗口(新办法):闸机不再盯着表看。它记下每一个进站人的确切时间戳。当第 101 个人想进站时,闸机往回看正好 60 秒(比如从 10:00:05 到 10:01:05)。如果这 60 秒内已经有 100 人了,就不让进。这个窗口是随着时间不停向前滑动的,没有任何死角。
它是怎么实现的?
这种滑动窗口核心是基于 redis zset 这种数据结构以及 redis lua 操作的原子性实现的,核心原因如下:
打点记录 (ZADD): 每当用户访问一次,就在 ZSET 里加一个值。Key 是用户 ID,Score 和 Value 都是当前毫秒时间戳。
清理过期数据 (ZREMRANGEBYSCORE): 删掉窗口之外的数据。比如窗口是 60 秒,那就删掉所有 Score < (当前时间 - 60秒) 的数据。
计算数量 (ZCARD): 统计剩下的数量。如果 ZCARD > 阈值,说明请求太快了,直接拒绝。
- 设置过期时间 (EXPIRE): 给这个 ZSET 设个过期时间(比如 5 分钟),防止僵尸数据占内存。
它能解决哪些问题?
精准打击非法脚本: 刷票软件或爬虫通常每 0.5 秒请求一次。固定窗口可能拦不住它们(如果跨窗口了),但滑动窗口能把它们的请求记录连成一条线,一旦频率超标,立刻封锁。
平滑流量: 它让流量均匀地进入系统,不会出现“前一秒闲死,后一秒挤死”的极端波动。
分片桶(Bucket)的理念: 由于每个用户拥有独立的 ZSET,这就像把流量分流到了成千上万个小桶里。某个用户的疯狂请求只会让自己的 “桶” 溢出(被限流),完全不会影响其他正常用户的访问。
分布式限流器额和本地限流器的比较
分布式全局观 vs 单机孤岛
假设你有 10 个服务器节点,每个节点配置限流 100 QPS。
本地限流(孤岛): 总流量可能会冲到 1000 QPS 才能触发限流。如果你的数据库(DB)只能抗住 300 QPS,那么本地限流完全保护不了你的 DB。流量分配是不均匀的。可能节点 A 挤爆了在限流,而节点 B 闲得发慌。
Redis 限流(全局): 所有节点共用一个“计数桶”。无论请求打到哪个节点,都会汇总到 Redis。它能精准控制总流量。你说限流 100,那全集群加起来就是 100。这对于保护下游的单点资源(如数据库、第三方支付接口)至关重要。
用户状态的连贯性(防刷场景)
这是 Redis 限流器真正的“杀手锏”:
- 本地限流: 如果一个恶意用户疯狂刷接口,只要他每次请求被负载均衡分发到不同的节点,他就能绕过单机的限流阈值。
- 分布式限流: 因为 Key 是基于
用户ID 或 IP 的,无论他怎么换节点,他在 Redis 里的记录都是累计的。结果恶意刷票、爬虫在分布式环境下无处遁形,被精准封杀。
生产环境的建议
就是那句经典的:“我全都要!” 😀
分布式滑动窗口限流器的关键代码
AOP 切面层
RateLimiterAspect:
限流功能毕竟是核心业务之外的装饰功能,当出现 Redis 宕机、网络抖动或集群切主时,应用系统必须能够实现降级或切换,保证核心业务的可用!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| import com.demo.componet.limiter.RateLimiter; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.core.annotation.Order; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; import java.util.Collections;
@Aspect @Component @Order(0) @Slf4j public class RateLimiterAspect { @Value("${owlias.limiter.enabled:true}") private boolean limiterEnabled;
@Resource private StringRedisTemplate masterStringRedisTemplate;
private static final DefaultRedisScript<Long> LIMIT_SCRIPT; static { LIMIT_SCRIPT = new DefaultRedisScript<>(); LIMIT_SCRIPT.setLocation(new ClassPathResource("lua/sliding_window_rate_limit.lua")); LIMIT_SCRIPT.setResultType(Long.class); }
@Around("@annotation(rateLimiter)") public Object doAround(ProceedingJoinPoint joinPoint, RateLimiter rateLimiter) throws Throwable { if (!limiterEnabled) { return joinPoint.proceed(); } String keySuffix = SpringExpressionUtils.parseSpel(rateLimiter.keyExpression(), joinPoint); String finalKey = rateLimiter.prefix() + keySuffix;
long result = 1L;
try { result = masterStringRedisTemplate.execute( LIMIT_SCRIPT, Collections.singletonList(finalKey), String.valueOf(System.currentTimeMillis()), String.valueOf(rateLimiter.window() * 1000L), String.valueOf(rateLimiter.max()) ); } catch (Exception e) { log.error("Redis 限流组件异常,已自动降级放行。Key: {}, 异常原因: {}", finalKey, e.getMessage()); }
if (result == 0) { log.warn("Rate limit exceeded for key: {}", finalKey); throw new RuntimeException(rateLimiter.message()); }
return joinPoint.proceed(); } }
|
LUA 脚本支持
lua/sliding_window_rate_limit.lua
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
local window_start = tonumber(ARGV[1]) - tonumber(ARGV[2]) local current_key = KEYS[1]
redis.call('ZREMRANGEBYSCORE', current_key, 0, window_start)
local current_requests = redis.call('ZCARD', current_key)
if current_requests < tonumber(ARGV[3]) then redis.call('ZADD', current_key, ARGV[1], ARGV[1] .. math.random()) redis.call('EXPIRE', current_key, math.ceil(tonumber(ARGV[2]) / 1000) + 60) return 1 else return 0 end
|
核心应用组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RateLimiter { String prefix() default "rate_limit:";
String keyExpression();
int window() default 60;
int max() default 100;
String message() default "操作过于频繁,请稍后再试"; }
|
相关测试
业务演示类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Service public class EmployeeService {
@RateLimiter(prefix = "rate_limit:emp:", keyExpression = "#id", window = 10, max = 5, message = "查询太快啦,喝杯茶再来") @RedisJsonReadingCache(prefix = "emp:", keyExpression = "#id", expireTime = 900L) public EmployeeDoc getById(String id) { return repositoryGetById(id); }
@RateLimiter(keyExpression = "T(com.demo.utils.IpUtils).getIp()") public EmployeeDoc getById2(String id) { return repositoryGetById(id); } }
|
测试单元
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import com.demo.App; import com.demo.model.EmployeeDoc; import com.demo.service.EmployeeService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows;
@Slf4j @SpringBootTest(classes = App.class) public class RateLimiterTest {
@Resource private EmployeeService employeeService;
@Test @DisplayName("测试按 ID 限流:10秒内允许5次,第6次应报错") public void testIdRateLimiting() { String testId = "1001";
for (int i = 1; i <= 5; i++) { employeeService.getById(testId); log.info("第 {} 次请求成功", i); }
RuntimeException exception = assertThrows(RuntimeException.class, () -> { employeeService.getById(testId); });
log.info("捕获到预期限流异常: {}", exception.getMessage()); assert exception.getMessage().equals("查询太快啦,喝杯茶再来"); }
@Test @DisplayName("测试按 IP 限流:连续快速访问") public void testIpRateLimiting() { assertThrows(RuntimeException.class, () -> { for (int i = 0; i < 110; i++) { employeeService.getById2("any-id"); } }); log.info("IP 限流测试通过"); }
@Test @DisplayName("测试限流降级:Redis 挂了也应该能查到数据") public void testRateLimiterFallback() {
EmployeeDoc emp = employeeService.getById("1001");
assertNotNull(emp); log.info("限流器降级测试通过:Redis 异常时业务未受影响"); } }
|
redis zset 生成的演示数据:
1 2 3 4 5 6
| 192.168.1.224:7001> zrange rate_limit:emp:1001 0 -1 1) "17708793791610.015582849278852" 2) "17708793827460.58409022054825" 3) "17708793840700.15936862638191" 4) "17708793849440.38371587469416" 5) "17708793858570.69100437345496"
|